CF-1871 : Add Catalog Update Command#3304
CF-1871 : Add Catalog Update Command#3304Paras Negi (paras-negi-flink) wants to merge 1 commit intomainfrom
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull request overview
Adds support for updating CMF Kafka Catalogs via the confluent flink catalog update command (Confluent Platform/on-prem), including REST client support and integration test coverage with golden fixtures.
Changes:
- Introduces
flink catalog update <resourceFilePath>command (JSON/YAML resource file) and shared catalog output formatting. - Adds CMF REST client support for updating Kafka catalogs and extends the on-prem test server to handle PUT requests.
- Adds on-prem integration tests plus input/output fixtures and help goldens for the new command.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
internal/flink/command_catalog.go |
Registers the new update subcommand; adds helpers for printing catalog output and reading JSON/YAML resource files. |
internal/flink/command_catalog_update.go |
Implements the flink catalog update command behavior (read file → update → describe → print). |
pkg/flink/cmf_rest_client.go |
Adds UpdateCatalog wrapper around the CMF SDK update endpoint with CLI-friendly error wrapping. |
test/test-server/flink_onprem_handler.go |
Extends the CMF catalog handler to support PUT for update scenarios. |
test/flink_onprem_test.go |
Adds integration test cases for catalog update (JSON + YAML). |
test/fixtures/input/flink/catalog/* |
Adds JSON/YAML resource files for update success and failure cases. |
test/fixtures/output/flink/catalog/* |
Adds golden outputs for update (human/json/yaml), update failure, and update help; updates catalog help to include update. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func printCatalogOutput(cmd *cobra.Command, sdkOutputCatalog cmfsdk.KafkaCatalog) error { | ||
| if output.GetFormat(cmd) == output.Human { | ||
| table := output.NewTable(cmd) | ||
| databases := make([]string, 0, len(sdkOutputCatalog.Spec.GetKafkaClusters())) | ||
| for _, kafkaCluster := range sdkOutputCatalog.Spec.GetKafkaClusters() { | ||
| databases = append(databases, kafkaCluster.DatabaseName) | ||
| } | ||
| var creationTime string | ||
| if sdkOutputCatalog.GetMetadata().CreationTimestamp != nil { | ||
| creationTime = *sdkOutputCatalog.GetMetadata().CreationTimestamp | ||
| } | ||
| table.Add(&catalogOut{ | ||
| CreationTime: creationTime, | ||
| Name: sdkOutputCatalog.GetMetadata().Name, | ||
| Databases: databases, | ||
| }) | ||
| return table.Print() | ||
| } | ||
|
|
||
| localCatalog := convertSdkCatalogToLocalCatalog(sdkOutputCatalog) | ||
| return output.SerializedOutput(cmd, localCatalog) | ||
| } |
There was a problem hiding this comment.
printCatalogOutput centralizes catalog rendering, but catalogCreate and catalogDescribe still contain their own (nearly identical) output formatting logic. To avoid future drift between commands, consider switching those commands to call printCatalogOutput as well.
There was a problem hiding this comment.
Robert Metzger (@rmetzger) Kumar Mallikarjuna (@kumar-mallikarjuna) does it make sense to touch other existing files to reduce duplication, or should we tackle that in another change ?
There was a problem hiding this comment.
I think it makes sense to fix this as part of this PR
There was a problem hiding this comment.
Agree with Copilot and Robert Metzger (@rmetzger) 's comment, maybe we can apply this idea to the catalog database as well.
|
| "apiVersion": "", | ||
| "kind": "", |
There was a problem hiding this comment.
Why are apiVersion and kind empty?
| apiVersion: "" | ||
| kind: "" |
| apiVersion: "" | ||
| kind: "" |
There was a problem hiding this comment.
Please fill these fields, and similar golden files.
| { | ||
| "databaseName": "test-database-2", | ||
| "connectionConfig": null | ||
| } |
There was a problem hiding this comment.
question: Are the connectionConfig designed to be null or empty map?
| func printCatalogOutput(cmd *cobra.Command, sdkOutputCatalog cmfsdk.KafkaCatalog) error { | ||
| if output.GetFormat(cmd) == output.Human { | ||
| table := output.NewTable(cmd) | ||
| databases := make([]string, 0, len(sdkOutputCatalog.Spec.GetKafkaClusters())) | ||
| for _, kafkaCluster := range sdkOutputCatalog.Spec.GetKafkaClusters() { | ||
| databases = append(databases, kafkaCluster.DatabaseName) | ||
| } | ||
| var creationTime string | ||
| if sdkOutputCatalog.GetMetadata().CreationTimestamp != nil { | ||
| creationTime = *sdkOutputCatalog.GetMetadata().CreationTimestamp | ||
| } | ||
| table.Add(&catalogOut{ | ||
| CreationTime: creationTime, | ||
| Name: sdkOutputCatalog.GetMetadata().Name, | ||
| Databases: databases, | ||
| }) | ||
| return table.Print() | ||
| } | ||
|
|
||
| localCatalog := convertSdkCatalogToLocalCatalog(sdkOutputCatalog) | ||
| return output.SerializedOutput(cmd, localCatalog) | ||
| } |
There was a problem hiding this comment.
Agree with Copilot and Robert Metzger (@rmetzger) 's comment, maybe we can apply this idea to the catalog database as well.
| return fmt.Errorf("catalog %q was updated successfully, but failed to retrieve updated details: %w", catalogName, err) | ||
| } | ||
|
|
||
| return printCatalogOutput(cmd, sdkOutputCatalog) |
There was a problem hiding this comment.
Let's use this unified print out function for the create command as well.
| return fmt.Errorf("catalog name is required: ensure the resource file contains a non-empty \"metadata.name\" field") | ||
| } | ||
|
|
||
| if err := client.UpdateCatalog(c.createContext(), catalogName, sdkCatalog); err != nil { |
There was a problem hiding this comment.
Same comment as the catalog database, is it the design that Update command doesn't return the updated catalog to make this an atomic operation? I am seeing an additional describe call below.
| {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.json", fixture: "flink/catalog/update-success.golden"}, | ||
| {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.json --output json", fixture: "flink/catalog/update-success-json.golden"}, | ||
| {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.json --output yaml", fixture: "flink/catalog/update-success-yaml.golden"}, | ||
| // failure scenarios with JSON files | ||
| {args: "flink catalog update test/fixtures/input/flink/catalog/update-invalid-failure.json", fixture: "flink/catalog/update-invalid-failure.golden", exitCode: 1}, |
There was a problem hiding this comment.
Should these be json format files? Or are these totally redundant?
| runIntegrationTestsWithMultipleAuth(s, tests) | ||
| } | ||
|
|
||
| func (s *CLITestSuite) TestFlinkCatalogUpdateWithYAML() { |
There was a problem hiding this comment.
| func (s *CLITestSuite) TestFlinkCatalogUpdateWithYAML() { | |
| func (s *CLITestSuite) TestFlinkCatalogUpdateOnPremWithYAML() { |
| return | ||
| } | ||
|
|
||
| timeStamp := time.Date(2025, time.March, 12, 23, 42, 0, 0, time.UTC).String() |
There was a problem hiding this comment.
Again, although not relevant to this PR, please evaluate the time format from server and see if we should use plain string or .Format(time.RFC3339)
| func (c *command) newCatalogUpdateCommand() *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "update <resourceFilePath>", | ||
| Short: "Update a Flink catalog.", |
There was a problem hiding this comment.
| Short: "Update a Flink catalog.", | |
| Short: "Update a Flink catalog in Confluent Platform.", |
| cmd := &cobra.Command{ | ||
| Use: "update <resourceFilePath>", | ||
| Short: "Update a Flink catalog.", | ||
| Long: "Update an existing Kafka Catalog in Confluent Platform from a resource file.", |
There was a problem hiding this comment.
We want to keep it consistent.
| Long: "Update an existing Kafka Catalog in Confluent Platform from a resource file.", | |
| Long: "Update an existing Flink catalog in Confluent Platform from a resource file.", |
There was a problem hiding this comment.
And by the way, is there a convention for the user to know which fields are mutable from an update operation?




Release Notes
Breaking Changes
New Features
confluent flink catalog updatecommand to update CMF KafkaCatalog on Confluent Platform.Bug Fixes
Checklist
Whatsection below whether this PR applies to Confluent Cloud, Confluent Platform, or both.Test & Reviewsection below.Blast Radiussection below.What
This PR implements CF-1871 — CLI: Update SQL catalog for the Confluent CLI, targeting Confluent Platform / CP Flink (CMF on-prem):
Adds a new command group under
confluent flink catalog:confluent flink catalog update <resourceFilePath>Wires this command to the existing CMF Catalog REST APIs:
PUT /cmf/api/v1/catalogs/kafka/{catName}.Updates
CmfRestClientwrapper forCatalogoperations and corresponding local types/output formatting, following existing patterns used for catalogs and compute pools.Blast Radius
confluent flinkandconfluent kafkacommands) is unchanged.References
KafkaDatabaseand the Flyway migration (e.g.,[CF-1772] Add flyway migration for KafkaCatalog -> KafkaDatabase).Test & Review
Environment
confluentinc/cliCF-18712.3-SNAPSHOT(image:confluentinc/cp-cmf:c505ee8b) - Kubernetes: local cluster with CMF deployed (cmf-serviceexposed viakubectl port-forward svc/cmf-service 8080:80 -n e2e)parasnegi@C6V9RN9V2Y confluent_darwin_arm64_v8.0 % ./confluent flink catalog describe new-kcat --url http://localhost:8080 --output json { "apiVersion": "cmf.confluent.io/v1", "kind": "KafkaCatalog", "metadata": { "name": "new-kcat", "creationTimestamp": "2026-04-03T14:46:29.297Z", "uid": "41acaae2-62f0-4d00-a8db-4235701ac37e", "labels": {}, "annotations": {} }, "spec": { "srInstance": { "connectionConfig": { "url": "http://localhost:8081" } }, "kafkaClusters": [] } }parasnegi@C6V9RN9V2Y confluent_darwin_arm64_v8.0 % ./confluent flink catalog describe new-kcat --url http://localhost:8080 --output json { "apiVersion": "cmf.confluent.io/v1", "kind": "KafkaCatalog", "metadata": { "name": "new-kcat", "creationTimestamp": "2026-04-03T14:46:29.297Z", "uid": "41acaae2-62f0-4d00-a8db-4235701ac37e", "labels": {}, "annotations": {} }, "spec": { "srInstance": { "connectionConfig": { "url": "http://localhost:8082" } }, "kafkaClusters": [] } }Non-empty KafkaCatalog.spec.kafkaClusters is resricted owing to the Flyway Migration